#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <vector>
#include "scopelock.h"
#include "verify.h"
#include "pollmgr.h"

static pthread_once_t gs_init_once = PTHREAD_ONCE_INIT;
PollMgr *PollMgr::_instance = NULL;

//how can we use parameter for constructor?
void PollMgr::PollMgrInit()
{
	_instance = new PollMgr();
}

PollMgr *PollMgr::Instance()
{
	pthread_once(&gs_init_once, &PollMgr::PollMgrInit);
	return _instance;
}

PollMgr::PollMgr(int max_callback)
{
	VERIFY(pthread_mutex_init(&_m, NULL) == 0);
	VERIFY((_poller = new EpollIO(MAX_POLLER_SIZE)) != NULL);
	_pollables = (Pollable **)calloc(MAX_POLLER_SIZE, sizeof(Pollable*));
	VERIFY(_pollables != NULL);
}

PollMgr::~PollMgr()
{
	VERIFY(pthread_mutex_destroy(&_m) == 0);
	delete _poller;
	for (int i = 0; i < MAX_POLLER_SIZE; ++i) {
		if (_pollables[i])
			free(_pollables[i]);
	}
	free(_pollables);
}

bool PollMgr::AddPollable(int fd, PollFlag flag, Pollable *pb)
{
	if (fd > MAX_POLLER_SIZE)
		return false;

	ScopeLock m(&_m);
	_poller->WatchFd(fd, flag);	
	if (_pollables[fd] && _pollables[fd] != pb)
		return false;
	_pollables[fd] = pb;

	return true;
}

bool PollMgr::DelPollable(int fd, PollFlag flag, Pollable *pb)
{
	if (fd > MAX_POLLER_SIZE)
		return false;

	ScopeLock m(&_m);
	if (!_pollables[fd] || _pollables[fd] != pb)
		return false;
	if (_poller->UnwatchFd(fd, flag))
		_pollables[fd] = NULL;
	
	return true;
}

void PollMgr::WaitLoop(int timeout)
{
	std::vector<int> readable;
	std::vector<int> writeable;

	while(1) {

		readable.clear();
		writeable.clear();

		_poller->WaitEvents(&readable, &writeable, timeout);
		if (!readable.size() && !writeable.size()) {
			continue;
		}
		for (size_t i = 0; i < readable.size(); ++i) {
			int fd = readable[i];
			if (_pollables[fd]) {
				_pollables[fd]->HandleRead(fd);
			}
		}
		for (size_t i = 0; i < writeable.size(); ++i) {
			int fd = writeable[i];
			if (_pollables[fd]) {
				_pollables[fd]->HandleWrite(fd);
			}
		}
	}
}

#ifdef _COLIN_TEST_POLLMGR_
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
#include <errno.h>
class Dispatcher : public Pollable
{
	public:	
		Dispatcher() { }
		~Dispatcher(){ }
		bool Init() {
			_fd = socket(AF_INET, SOCK_DGRAM, 0);
			fcntl(_fd, F_SETFL, fcntl(_fd, F_GETFL, NULL) | O_NONBLOCK);
			struct sockaddr_in stAddr;
			memset(&stAddr, 0, sizeof(stAddr));
			stAddr.sin_family = AF_INET;
			stAddr.sin_addr.s_addr = INADDR_ANY;
			stAddr.sin_port = htons(9090);
			bind(_fd, (struct sockaddr *)&stAddr, sizeof(stAddr));
			return true;
		}

		int HandleRead(int fd) {
			if (fd != _fd) 
				return -1;
			socklen_t clt_len = sizeof(stCltAddr);
			_size = recvfrom(fd, _buffer, sizeof(_buffer), 0, (struct sockaddr *)&stCltAddr, &clt_len);
			if (_size < 0) {
				if (errno == EINTR)
					return 0;
				perror("recvfrom error");
			}
			PollMgr::Instance()->AddPollable(fd, POLLER_WR, this); 
			return _size;
		}
		int HandleWrite(int fd) {
			if (fd != _fd) 
				return -1;
			int bs = sendto(fd, _buffer, _size, 0, (struct sockaddr *)&stCltAddr, sizeof(stCltAddr));
			if (bs < 0) {
				if (errno == EINTR)
					return 0;
				perror("sendto error");
			}
			PollMgr::Instance()->DelPollable(fd, POLLER_WR, this); 
			return bs;
		}

		int GetFD() const { return _fd; }

	private:
		struct sockaddr_in stCltAddr;
		char _buffer[4096];
		int _size;
		int _fd;
};

int main(int argc, char **argv)
{
	Dispatcher *udp = new Dispatcher();
	udp->Init();
	PollMgr::Instance()->AddPollable(udp->GetFD(), POLLER_RD, udp);
	PollMgr::Instance()->WaitLoop(5000);
	return 0;
}
#endif
